package com.atguigu.flink.chapter5.source;


import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2020/12/19 11:26
 */
public class Flink03_Source_Kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092");
        props.setProperty("group.id", "Flink03_Source_Kafka");
        props.setProperty("auto.offset.reset", "latest");

        DataStreamSource<String> ds = env.addSource(new FlinkKafkaConsumer<>("sensor", new SimpleStringSchema(), props));
        ds.print("kafka");
        env.execute();
    }

}
/*
sensor_1,1607527992000,20
 */